Quick Start#

This quick guide shows how you can develop a scaleable forecasting system using ForecastFlowML.

Scenario#

  • Dataset constitutes 3 regions that you want to develop an individual model for.

  • Each of the store data is small enough to fit into the single machine memory but large enough to cause memory issue for all 10 stores.

What Will Do#

  • Build independent models for each of the 10 stores.

  • Parallelize training/inference steps.

  • Want to use LightGBM as machine learning algorithm.

  • Develop direct multi-step forecasting using LightGBM.

  • Perform backtesting.

Import packages#

from forecastflowml.meta_model import ForecastFlowML
from forecastflowml.preprocessing import FeatureExtractor
from forecastflowml.data.loader import load_walmart_m5
from lightgbm import LGBMRegressor
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "notebook"
pd.set_option('display.max_columns', 100)

Initialize Spark#

spark = (
    SparkSession.builder.master("local[4]")
    .config("spark.driver.memory", "8g")
    .config("spark.sql.shuffle.partitions", "4")
    .getOrCreate()
)

Sample Dataset#

df = load_walmart_m5(spark).localCheckpoint()
df.limit(10).toPandas().head(5)
id item_id dept_id cat_id store_id state_id sales date christmas
0 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 2.0 2011-01-29 0
1 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 5.0 2011-01-30 0
2 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 3.0 2011-01-31 0
3 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 0.0 2011-02-01 0
4 FOODS_1_013_TX_2_evaluation FOODS_1_013 FOODS_1 FOODS TX_2 TX 0.0 2011-02-02 0

Feature Engineering#

feature_extractor = FeatureExtractor(
    id_col="id",
    date_col="date",
    target_col="sales",
    lag_window_features={
        "lag": [7 * (i + 1) for i in range(8)],
        "mean": [
            [window, lag] for lag in [7, 14, 21, 28] for window in [7, 14, 30]
        ],
    },
    date_features=[
        "day_of_month",
        "day_of_week",
        "week_of_year",
        "quarter",
        "month",
        "year",
    ],
    count_consecutive_values={
    "value": 0, 
    "lags": [7, 14, 21, 28],
    },
    history_length=True,
)
df_features = feature_extractor.transform(df).localCheckpoint()
df_features.limit(10).toPandas().head(5)
date id christmas lag_49 lag_42 store_id cat_id dept_id window_14_lag_14_mean window_7_lag_21_mean window_14_lag_28_mean item_id lag_35 count_consecutive_value_lag_7 count_consecutive_value_lag_21 window_30_lag_28_mean window_7_lag_7_mean lag_21 window_30_lag_21_mean lag_28 count_consecutive_value_lag_28 window_30_lag_7_mean lag_7 window_30_lag_14_mean window_7_lag_14_mean window_14_lag_7_mean state_id window_14_lag_21_mean window_7_lag_28_mean lag_56 lag_14 sales count_consecutive_value_lag_14 history_length day_of_month day_of_week week_of_year quarter month year
0 2011-01-31 FOODS_1_011_WI_2_evaluation 0 NaN NaN WI_2 FOODS FOODS_1 NaN NaN NaN FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN WI NaN NaN NaN NaN 2.0 NaN 1 31 2 5 1 1 2011
1 2011-02-01 FOODS_1_011_WI_2_evaluation 0 NaN NaN WI_2 FOODS FOODS_1 NaN NaN NaN FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN WI NaN NaN NaN NaN 0.0 NaN 2 1 3 5 1 2 2011
2 2011-02-02 FOODS_1_011_WI_2_evaluation 0 NaN NaN WI_2 FOODS FOODS_1 NaN NaN NaN FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN WI NaN NaN NaN NaN 0.0 NaN 3 2 4 5 1 2 2011
3 2011-02-03 FOODS_1_011_WI_2_evaluation 0 NaN NaN WI_2 FOODS FOODS_1 NaN NaN NaN FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN WI NaN NaN NaN NaN 0.0 NaN 4 3 5 5 1 2 2011
4 2011-02-04 FOODS_1_011_WI_2_evaluation 0 NaN NaN WI_2 FOODS FOODS_1 NaN NaN NaN FOODS_1_011 NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN NaN WI NaN NaN NaN NaN 0.0 NaN 5 4 6 5 1 2 2011

Split dataset into train and test#

df_train = df_features.filter(F.col("date") <= "2016-05-22")
df_future = df_features.filter(F.col("date") > "2016-05-22")

Training#

model = ForecastFlowML(
    
    # dataset parameters
    group_col="store_id",
    id_col="id",
    date_col="date",
    target_col="sales",
    date_frequency="days",

    # model parameters
    model_horizon=7,
    max_forecast_horizon=28,
    model=LGBMRegressor(),
)
trained_models = model.train(df_train).localCheckpoint()
trained_models.limit(10).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:

It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
group forecast_horizon model start_time end_time elapsed_seconds
0 CA_2 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:32) 05-Apr-2023 (22:34:39) 6.8
1 CA_3 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:40) 05-Apr-2023 (22:34:44) 4.5
2 WI_2 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:44) 05-Apr-2023 (22:34:49) 4.2
3 WI_3 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:49) 05-Apr-2023 (22:34:52) 3.7
4 CA_1 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:32) 05-Apr-2023 (22:34:39) 7.1
5 CA_4 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:39) 05-Apr-2023 (22:34:44) 4.8
6 TX_1 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:45) 05-Apr-2023 (22:34:49) 4.4
7 TX_3 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:50) 05-Apr-2023 (22:34:53) 3.8
8 WI_1 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:54) 05-Apr-2023 (22:34:57) 3.8
9 TX_2 [[1, 2, 3, 4, 5, 6, 7], [8, 9, 10, 11, 12, 13,... [[128, 3, 99, 108, 105, 103, 104, 116, 103, 98... 05-Apr-2023 (22:34:32) 05-Apr-2023 (22:34:38) 6.4

Backtesting#

cv_forecast = model.cross_validate(df_train, n_cv_splits=3).localCheckpoint()
cv_forecast.limit(5).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:

It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
group id date cv target forecast
0 CA_2 FOODS_1_179_CA_2_evaluation 2016-04-25 0 1.0 0.421989
1 CA_2 FOODS_1_179_CA_2_evaluation 2016-04-26 0 0.0 0.423847
2 CA_2 FOODS_1_179_CA_2_evaluation 2016-04-27 0 0.0 0.374551
3 CA_2 FOODS_1_179_CA_2_evaluation 2016-04-28 0 0.0 0.342926
4 CA_2 FOODS_1_179_CA_2_evaluation 2016-04-29 0 0.0 0.378001

Plot cross validation forecasts#

cv_state = (
    df_train.select("id", "store_id", "date", "sales")
    .join(
        cv_forecast.select("id", "date", "cv", "forecast"),
        on=["id", "date"],
        how="left",
    )
    .groupBy("id", "store_id", "date", "sales")
    .pivot("cv")
    .sum("forecast")
    .groupBy("store_id", "date")
    .agg(
        F.sum("sales").alias("sales"),
        *[F.sum(f"{i}").alias(f"cv_{i}") for i in range(3)],
    )
    .orderBy("store_id", "date")
).toPandas()
pio.renderers.default = "notebook"
fig = px.line(
    cv_state,
    x="date",
    y=["sales", *[f"cv_{i}" for i in range(3)]],
    facet_row_spacing=0.04,
    facet_col = "store_id",
    facet_col_wrap=2,
    height=1000,
    width=720,
)
fig.update_layout(legend=dict(
        orientation="h",
        yanchor="top",
        y=1.07,
        xanchor="center",
        x=0.5),
     margin=dict(l=0, r=10, t=5, b=5),
    legend_title="")
fig.update_traces(line=dict(width=1.7))
fig.update_yaxes(matches=None, title="")
fig.update_xaxes(type="date", range=["2015-11-01", "2016-05-22"])

Inference#

forecast = model.predict(df_future, trained_models)
forecast.limit(5).toPandas()
C:\spark-3.0.0-bin-hadoop3.2\python\pyspark\sql\pandas\group_ops.py:76: UserWarning:

It is preferred to use 'applyInPandas' over this API. This API will be deprecated in the future releases. See SPARK-28264 for more details.
id date prediction
0 FOODS_1_179_CA_2_evaluation 2016-05-23 0.334823
1 FOODS_1_179_CA_2_evaluation 2016-05-24 0.302485
2 FOODS_1_179_CA_2_evaluation 2016-05-25 0.390652
3 FOODS_1_179_CA_2_evaluation 2016-05-26 0.390652
4 FOODS_1_179_CA_2_evaluation 2016-05-27 0.704083

Visualize Predictions#

past_future = (
    df.select("id", "store_id", "date", "sales")
    .join(forecast, on=["id", "date"], how="left")
    .groupBy("store_id", "date")
    .agg(
        F.sum("sales").alias("sales"),
        F.sum("prediction").alias("prediction"),
    )
    .orderBy("store_id", "date")
    .toPandas()
)
pio.renderers.default = "notebook"
fig = px.line(
    past_future,
    x="date",
    y=["sales", "prediction"],
    facet_row_spacing=0.04,
    facet_col = "store_id",
    facet_col_wrap=2,
    height=1000,
    width=720,

)
fig.update_layout(legend=dict(
        orientation="h",
        yanchor="top",
        y=1.07,
        xanchor="center",
        x=0.5),
     margin=dict(l=0, r=10, t=5, b=5),
    legend_title="")
fig.update_traces(line=dict(width=1.7))
fig.update_yaxes(matches=None, title="")
fig.update_xaxes(type="date", range=["2015-11-01", "2016-06-19"])